package com.shockweb.client;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.shockweb.bridge.DataAgreement;
import com.shockweb.bridge.OperationDefine;
import com.shockweb.common.International;
import com.shockweb.common.log.LogManager;
import com.shockweb.utils.Convert;
import com.shockweb.common.serializable.SerializableObject;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 链接到服务器的客户端，基础客户端通用实现
 * 
 * @author 彭明华
 * 2018年1月2日 创建
 */
public class Client implements Runnable{

	/**
	 * 构造方法
	 */
	public Client(){
		
	}
	/**
	 * 带参数构造方法
     * @param timeOut 请求服务超时设置
     * @param connectTimeOut 创建连接超时设置
     * @param sleepTime 每次循环等待时间
     * @param idleStateTime 长连接心跳间隔发送时间
	 */
	public Client(int timeOut,int connectTimeOut,int sleepTime,int idleStateTime){
		this.timeOut = timeOut;
		this.connectTimeOut = connectTimeOut;
		this.sleepTime = sleepTime;
		this.idleStateTime = idleStateTime;
	}
	
    /**
     * 成功
     */
    public static final byte SUCCESS = 1;
    /**
     * 错误信息
     */
    public static final byte ERROR = 2;
    
    /**
     * 请求超时时间
     */
    private int timeOut = 5000;
    
    /**
     * 设置请求超时时间
     */
    public void setTimeOut(int timeOut) {
        this.timeOut = timeOut;
    }
    
    /**
     * 请求超时时间
     * @return
     */
    public int getTimeOut(){
    	return timeOut;
    }
    
    /**
     * 创建连接超时时间
     */
    private int connectTimeOut = 5000;

    /**
     * 设置创建连接超时时间
     */
    public void setConnectTimeOut(int connectTimeOut) {
        this.connectTimeOut = connectTimeOut;
    }
    
    /**
     * 创建连接超时时间
     * @return
     */
    public int getConnectTimeOut(){
    	return connectTimeOut;
    }
    
    /**
     * 长连接心跳间隔发送时间
     */
    private int idleStateTime = 5*1000;
    
    /**
     * 设置长连接心跳间隔发送时间
     */
    public void setIdleStateTime(int idleStateTime) {
        this.idleStateTime = idleStateTime;
    }
    
    /**
     * 长连接心跳间隔发送时间
     * @return
     */
    public int getIdleStateTime(){
    	return idleStateTime;
    }
    
    /**
     * 客户端上下文
     */
    private ClientContext context = new ClientContext();
    /**
     * 客户端上下文
     * @return
     */
    public ClientContext getContext(){
    	return context;
    }
    
    /**
     * 取信息的每次循环等待时间
     */
    private int sleepTime = 20;
    
    /**
     * 设置取信息的每次循环等待时间
     */
    public void setSleepTime(int sleepTime) {
        this.sleepTime = sleepTime;
    }
    
    /**
     * 取信息的每次循环等待时间
     */
    public int getSleepTime() {
        return sleepTime;
    }
    
    /**
     * 客户端保持长连接心跳发送时间
     */
    private int clientIdleStateTime = 1000*5;

    /**
     * 客户端保持长连接心跳发送时间
     * @return
     */
    public int getClientIdleStateTime(){
    	return clientIdleStateTime;
    }
    
    /**
     * 设置客户端保持长连接心跳发送时间
     * @param clientIdleStateTime
     */
    public void setClientIdleStateTime(int clientIdleStateTime){
    	this.clientIdleStateTime = clientIdleStateTime;
    }

    /**
     * 连接服务端的连接
     */
    private String hostUrl = null;

	/**
	 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
	 */
    private volatile NioEventLoopGroup workerGroup;
    /**
     * A {@link Bootstrap} that makes it easy to bootstrap a {@link Channel} to use
     * for clients.
     *
     * <p>The {@link #bind()} methods are useful in combination with connectionless transports such as datagram (UDP).
     * For regular TCP connections, please use the provided {@link #connect()} methods.</p>
     */
    private volatile Bootstrap bootstrap;

    /**
     * 返回连接到服务器的Url
     * @return
     */
    public String getHostUrl(){
    	return hostUrl;
    }
    
    /**
     * 创建连接
     * @param centerServer
     */
    public final void connect(String hostUrl)throws ClientException {
    	try{
    		this.hostUrl = hostUrl;
	        new Thread(this).start();
	        if(context.getChannel()==null){
	            synchronized(context){
	            	context.wait(connectTimeOut);
	            }
	        }
	        long time = System.currentTimeMillis();
	        while (context.getChannel()==null) {
	        	if(getError()!=null){
	        		throw getError();
	        	}
	            long difftime = System.currentTimeMillis() - time;
	            if (this.connectTimeOut < difftime) {
	                throw new TimeoutException("客户端创建连接接超时");
	            }
	            if (this.sleepTime > 0) {
	                Thread.sleep(this.sleepTime);
	            }
	        }
		} catch (Throwable e) {
			throw new ClientException(e);
		}
    }
    
    /**
     * 异步创建线程
     */
    public void run(){
    	startFlag = false;
    	try {
			doConnect(hostUrl); 
		} catch (ClientException e) {
			this.error = e;
		} catch (Exception e) {
			this.error = e;
			LogManager.errorLog(this.getClass(), "连接服务器失败 Server=" + hostUrl,e);
		}finally {
	        if (startFlag) {
	            workerGroup.shutdownGracefully();
	            LogManager.infoLog(this.getClass(), "服务器连接停止 Server=" + hostUrl);
	        }
        }
    }
    
    /**
     * 异常信息
     */
    private Throwable error = null;
    
    /**
     * 返回异常信息
     * @return
     */
    private Throwable getError(){
    	return error;
    }
    
    /**
     * 是否启动标志
     */
    public boolean startFlag = false;
    

    
	/**
	 * 创建netty服务器连接
	 * @param centerServer
	 * @throws ClientException 
	 */
    private void doConnect(final String centerServer) throws ClientException {
		ChannelFuture f;
		try {
			
			bootstrap = new Bootstrap();
    		workerGroup = new NioEventLoopGroup();
			final Client client = this;
			// 指定EventLoopGroup
			bootstrap.group(workerGroup);
			// 指定channel类型
			bootstrap.channel(NioSocketChannel.class);
			// 指定Handler
			bootstrap.handler(new ChannelInitializer<SocketChannel>() {
			    @Override
			    protected void initChannel(SocketChannel ch) throws Exception {
			    	ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
					ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
                    ch.pipeline().addLast(new IdleStateHandler(0, idleStateTime, 0, TimeUnit.MILLISECONDS));
					ch.pipeline().addLast(new ClientNettyHandler(client));
			    }
			});
		
            bootstrap.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);//用于缓冲区复用
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeOut);
            
			error = null;
        	String[] tmp = centerServer.split(":");
        	String host = tmp[0].trim();
        	int port = 3000;
        	if(tmp.length>1){
        		port = Integer.parseInt(tmp[1].trim());
        	}
        	error = null;
			f = bootstrap.connect(host, port).addListener(new ChannelFutureListener() {
				public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                    	startFlag = true;
                        context.setChannel(f.channel());
                        LogManager.infoLog(this.getClass(), "连接服务器成功 Server=" + centerServer);
                    }else{
                    	error = f.cause();
                    }
                    synchronized(context){
                    	context.notifyAll();
                    }
				}
			});
			f.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			this.error = e;
			LogManager.errorLog(this.getClass(), "客户端创建连失败 Server=" + centerServer,e);
			throw new ClientException(e);
        } finally {
            // 优雅退出 释放线程池资源
        	context.setChannel(null);
            workerGroup.shutdownGracefully();
           	LogManager.infoLog(this.getClass(), "客户端连接断开 Server=" + centerServer);
        }
	}
    
    

	/**
	 * 关闭连接
	 */
    final public void close() {
    	if (startFlag) {
    		try{
    			if(context.getChannel()!=null)
    				context.getChannel().close();
    		}finally{
				workerGroup.shutdownGracefully();
    		}
    	}
	}
    
    /**
     * 发送byte[]数据并等到byte[]数据返回，用于高效传输数据
     * @param operation 操作符
     * @param data 二进制数据
     * @return
     * @throws ClientException
     */
    public byte[] send(OperationDefine operation,byte[] data) throws ClientException {
        try {
        	String uuid = UUID.randomUUID().toString();
        	context.init(uuid);
        	Request req = context.getRequest(uuid);
            try {
                clientSend(operation,uuid,data);
                long time = System.currentTimeMillis();
                context.lockWait(uuid,timeOut);
                while (!req.returnValue) {
                    long difftime = System.currentTimeMillis() - time;
                    if (timeOut < difftime) {
                        throw new TimeoutException("客户端send方法发送超时，operation=" + operation + ",uuid=" + uuid);
                    }
                    if (sleepTime > 0) {
                        Thread.sleep(sleepTime);
                    }
                }
                if(req.exception!=null){
                	throw new ClientException("接收应答数据异常",req.exception);
                }
                byte[] src = req.data;
                byte rtnOper = DataAgreement.resolutionOperation(req.data);
                if (rtnOper == SUCCESS) {
                    return null;
                } else if (rtnOper == ERROR) {
                    throw new ClientException(DataAgreement.resolutionError(src));
                } else{
                	return SerializableObject.copyOfRange(src, DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH,
                			src.length - DataAgreement.BYTE_LENGTH - DataAgreement.UUID_LENGTH);
                }
            } finally {
            	context.destroy(uuid);
            }
        } catch (ClientException e) {
            throw e;
        } catch (Throwable e) {
            throw new ClientException("send方法异常", e);
        }
    }
    
    
    

    /**
     * 调用远程方法，请求和结果数据不做任何转换
     * @param operation
     * @param data
     * @return
     * @throws ClientException
     */
    public byte[] rpc(String uuid,OperationDefine operation, byte[] data) throws ClientException {
        try {
        	context.init(uuid);
        	Request req = context.getRequest(uuid);
            try {
                clientSend(operation,uuid,data);
                long time = System.currentTimeMillis();
                context.lockWait(uuid,timeOut);
                while (!req.returnValue) {
                    long difftime = System.currentTimeMillis() - time;
                    if (timeOut < difftime) {
                        throw new TimeoutException("客户端send方法发送超时，operation=" + operation + ",uuid=" + uuid);
                    }
                    if (sleepTime > 0) {
                        Thread.sleep(sleepTime);
                    }
                }
                if(req.exception!=null){
                	throw new ClientException("接收应答数据异常",req.exception);
                }
                return req.data;
            } finally {
            	context.destroy(uuid);
            }
        } catch (ClientException e) {
            throw e;
        } catch (Throwable e) {
            throw new ClientException("send方法异常", e);
        }
    }
    
    
    /**
     * 发送对象数据并等到对象数据返回
     * @param operation 操作符
     * @param data 对象数据
     * @throws ClientException
     */
    public Object send(OperationDefine operation, Object data,Class<?> clazz) throws ClientException {
        try {
        	String uuid = UUID.randomUUID().toString();
        	context.init(uuid);
        	Request req = context.getRequest(uuid);
            try {
                clientSend(operation,uuid,Convert.convertToBytes(data));
                long time = System.currentTimeMillis();
                context.lockWait(uuid,timeOut);
                while (!req.returnValue) {
                    long difftime = System.currentTimeMillis() - time;
                    if (timeOut < difftime) {
                        throw new TimeoutException("客户端send方法发送超时，operation=" + operation + ",uuid=" + uuid);
                    }
                    if (sleepTime > 0) {
                        Thread.sleep(sleepTime);
                    }
                }
                if(req.exception!=null){
                	throw new ClientException("接收应答数据异常",req.exception);
                }
                byte[] src = req.data;
                byte rtnOper = DataAgreement.resolutionOperation(req.data);
                if (rtnOper == SUCCESS) {
                    return null;
                } else if (rtnOper == ERROR) {
                    throw new ClientException(DataAgreement.resolutionError(src));
                } else{
                	Object rtn = Convert.convertToObject(src, DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH,
                			src.length - DataAgreement.BYTE_LENGTH - DataAgreement.UUID_LENGTH, clazz);
                	return rtn;
                }
            } finally {
            	context.destroy(uuid);
            }
        } catch (ClientException e) {
            throw e;
        } catch (Throwable e) {
            throw new ClientException("send方法异常", e);
        }
    }
    
    /**
     * 当前连接是否可用
     * @return
     */
    public boolean isActive(){
    	Channel channel = context.getChannel();
    	return (channel != null && channel.isOpen() && channel.isActive() && channel.isWritable());
    }
    
    /**
     * 向服务器发送数据
     * @param operation 操作符
     * @param uuid 唯一id
     * @param data 二进制数据
     * @throws IOException
     */
    private void clientSend(OperationDefine operation,String uuid, byte[] data) throws IOException {
    	Channel channel = context.getChannel();
        ByteBufAllocator alloc = channel.alloc();
        ByteBuf buf = null;
        int len = DataAgreement.BYTE_LENGTH + DataAgreement.UUID_LENGTH;
        if (data != null) {
            len = len + data.length;
        }
        buf = alloc.buffer(len);
        buf.writeByte(operation.value());
        buf.writeBytes(uuid.getBytes(International.CHARSET));
        if (data != null) {
        	buf.writeBytes(data);
        }
        channel.writeAndFlush(buf).addListener(
                new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture f)
                            throws Exception {
                        if (!f.isSuccess()) {
                            LogManager.errorLog(Client.class,"客户端发送信息失败",f.cause());
                        }
                    }
                });


    }
}
